package com.xiaofan.apitest.window

import com.xiaofan.apitest.source.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object WindowTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val inputStream: DataStream[String] = env.socketTextStream("192.168.1.27", 9999)

    val dataStream: DataStream[SensorReading] = inputStream.map(
      data => {
        val arr: Array[String] = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    )

    // 每15秒统计一次，窗口内各传感器所有温度的最小值
    val resultStream: DataStream[SensorReading] = dataStream
      .keyBy(_.id)
      .timeWindow(Time.seconds(15))
      //      .window(TumblingEventTimeWindows.of(Time.seconds(15)))
      // .minBy("temperature") 温度最小的那条记录及相关属性
      .reduce { (curState, newData) => SensorReading(curState.id, newData.timestamp, curState.temperature.min(newData.temperature)) } // 温度最小值，及最新时间

    resultStream.print()

    env.execute("window test")


  }
}
